Download Data into local for last Ten Years (2014-2024)¶

In [12]:
!pip install plotly
WARNING: The directory '/home/jovyan/.cache/pip' or its parent directory is not owned or is not writable by the current user. The cache has been disabled. Check the permissions and owner of that directory. If executing pip with sudo, you should use sudo's -H flag.
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: plotly in /home/rvasappanavara/.local/lib/python3.11/site-packages (6.0.1)
Requirement already satisfied: narwhals>=1.15.1 in /home/rvasappanavara/.local/lib/python3.11/site-packages (from plotly) (1.37.1)
Requirement already satisfied: packaging in /opt/conda/lib/python3.11/site-packages (from plotly) (23.2)
In [13]:
import os
import requests
from tqdm import tqdm
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import time
import psutil

# Create directory for data if it doesn't exist
output_dir = '/home/rvasappanavara/rvasappanavara/shared/nyc_taxi_data'
os.makedirs(output_dir, exist_ok=True)

# Show system resources
mem = psutil.virtual_memory()
print(f"System Memory: {mem.total / (1024**3):.1f} GB total, {mem.available / (1024**3):.1f} GB available")
print(f"CPU Cores: {psutil.cpu_count(logical=False)} physical, {psutil.cpu_count()} logical")
print(f"Disk space used: {psutil.disk_usage('/').free / (1024**3):.1f} GB")
System Memory: 251.5 GB total, 208.8 GB available
CPU Cores: 128 physical, 128 logical
Disk space used: 0.1 GB
In [14]:
def download_file(url, local_filename):
    """
    Download a file from a URL with a progress bar
    """
    try:
        with requests.get(url, stream=True, timeout=300) as r:
            r.raise_for_status()
            total_size = int(r.headers.get('content-length', 0))
            
            with open(local_filename, 'wb') as f, tqdm(
                    total=total_size,
                    unit='B',
                    unit_scale=True,
                    unit_divisor=1024,
                    desc=os.path.basename(local_filename)
                ) as progress_bar:
                
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
                    progress_bar.update(len(chunk))
        
        return {"status": "success", "path": local_filename, "size": os.path.getsize(local_filename)}
    except Exception as e:
        return {"status": "error", "path": local_filename, "error": str(e)}
In [15]:
def download_taxi_data(year, month, output_dir, base_url="https://d37ci6vzurychx.cloudfront.net/trip-data"):
    """
    Download a specific month of taxi data
    """
    month_str = f"{month:02d}"
    filename = f"yellow_tripdata_{year}-{month_str}.parquet"
    url = f"{base_url}/{filename}"
    local_path = os.path.join(output_dir, filename)
    
    # Skip if file already exists and is not empty
    if os.path.exists(local_path) and os.path.getsize(local_path) > 0:
        print(f"File already exists: {filename}, skipping...")
        return {"status": "skipped", "path": local_path, "size": os.path.getsize(local_path)}
    
    print(f"Downloading {filename}...")
    result = download_file(url, local_path)
    
    if result["status"] == "success":
        print(f"Successfully downloaded {filename} ({result['size'] / (1024*1024):.2f} MB)")
    else:
        print(f"Failed to download {filename}: {result['error']}")
    
    return result
In [16]:
# Define years and months to download
years = [2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]  # Adjust as needed
months = [1, 2, 3, 4, 5, 6, 7, 8, 9 , 10, 11, 12]  # Adjust as needed
max_workers = 4  # Number of concurrent downloads

# Download files in parallel
start_time = time.time()
download_tasks = [(year, month) for year in years for month in months]
results = {"successful": 0, "failed": 0, "skipped": 0, "total_size_mb": 0}

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = [
        executor.submit(download_taxi_data, year, month, output_dir) 
        for year, month in download_tasks
    ]
    
    for future in tqdm(futures, desc="Overall Progress", unit="file"):
        result = future.result()
        if result["status"] == "success":
            results["successful"] += 1
            results["total_size_mb"] += result["size"] / (1024 * 1024)
        elif result["status"] == "skipped":
            results["skipped"] += 1
            results["total_size_mb"] += result["size"] / (1024 * 1024)
        else:
            results["failed"] += 1
File already exists: yellow_tripdata_2014-01.parquet, skipping...
File already exists: yellow_tripdata_2014-02.parquet, skipping...
File already exists: yellow_tripdata_2014-03.parquet, skipping...
File already exists: yellow_tripdata_2014-04.parquet, skipping...
File already exists: yellow_tripdata_2014-05.parquet, skipping...
File already exists: yellow_tripdata_2014-06.parquet, skipping...
File already exists: yellow_tripdata_2014-07.parquet, skipping...
File already exists: yellow_tripdata_2014-08.parquet, skipping...
Overall Progress: 100%|██████████| 132/132 [00:00<00:00, 2043.68file/s]
File already exists: yellow_tripdata_2014-10.parquet, skipping...File already exists: yellow_tripdata_2014-12.parquet, skipping...

File already exists: yellow_tripdata_2014-11.parquet, skipping...
File already exists: yellow_tripdata_2015-02.parquet, skipping...
File already exists: yellow_tripdata_2015-03.parquet, skipping...
File already exists: yellow_tripdata_2015-01.parquet, skipping...
File already exists: yellow_tripdata_2015-04.parquet, skipping...
File already exists: yellow_tripdata_2015-05.parquet, skipping...
File already exists: yellow_tripdata_2015-06.parquet, skipping...
File already exists: yellow_tripdata_2015-07.parquet, skipping...
File already exists: yellow_tripdata_2015-08.parquet, skipping...
File already exists: yellow_tripdata_2015-09.parquet, skipping...
File already exists: yellow_tripdata_2015-10.parquet, skipping...
File already exists: yellow_tripdata_2015-12.parquet, skipping...
File already exists: yellow_tripdata_2016-01.parquet, skipping...
File already exists: yellow_tripdata_2016-02.parquet, skipping...
File already exists: yellow_tripdata_2016-03.parquet, skipping...
File already exists: yellow_tripdata_2016-04.parquet, skipping...
File already exists: yellow_tripdata_2016-05.parquet, skipping...
File already exists: yellow_tripdata_2016-06.parquet, skipping...
File already exists: yellow_tripdata_2016-07.parquet, skipping...
File already exists: yellow_tripdata_2016-08.parquet, skipping...
File already exists: yellow_tripdata_2016-09.parquet, skipping...
File already exists: yellow_tripdata_2016-10.parquet, skipping...
File already exists: yellow_tripdata_2016-12.parquet, skipping...
File already exists: yellow_tripdata_2017-01.parquet, skipping...
File already exists: yellow_tripdata_2017-02.parquet, skipping...
File already exists: yellow_tripdata_2016-11.parquet, skipping...
File already exists: yellow_tripdata_2017-04.parquet, skipping...
File already exists: yellow_tripdata_2017-05.parquet, skipping...
File already exists: yellow_tripdata_2017-06.parquet, skipping...
File already exists: yellow_tripdata_2017-07.parquet, skipping...
File already exists: yellow_tripdata_2017-08.parquet, skipping...
File already exists: yellow_tripdata_2017-09.parquet, skipping...
File already exists: yellow_tripdata_2017-10.parquet, skipping...
File already exists: yellow_tripdata_2017-11.parquet, skipping...
File already exists: yellow_tripdata_2017-12.parquet, skipping...
File already exists: yellow_tripdata_2018-01.parquet, skipping...
File already exists: yellow_tripdata_2018-02.parquet, skipping...
File already exists: yellow_tripdata_2018-03.parquet, skipping...
File already exists: yellow_tripdata_2018-04.parquet, skipping...
File already exists: yellow_tripdata_2018-05.parquet, skipping...
File already exists: yellow_tripdata_2018-06.parquet, skipping...
File already exists: yellow_tripdata_2018-07.parquet, skipping...
File already exists: yellow_tripdata_2018-08.parquet, skipping...
File already exists: yellow_tripdata_2018-09.parquet, skipping...
File already exists: yellow_tripdata_2018-10.parquet, skipping...
File already exists: yellow_tripdata_2018-11.parquet, skipping...
File already exists: yellow_tripdata_2018-12.parquet, skipping...
File already exists: yellow_tripdata_2019-01.parquet, skipping...
File already exists: yellow_tripdata_2019-02.parquet, skipping...
File already exists: yellow_tripdata_2019-03.parquet, skipping...
File already exists: yellow_tripdata_2019-04.parquet, skipping...
File already exists: yellow_tripdata_2019-05.parquet, skipping...
File already exists: yellow_tripdata_2019-06.parquet, skipping...
File already exists: yellow_tripdata_2019-07.parquet, skipping...
File already exists: yellow_tripdata_2019-08.parquet, skipping...
File already exists: yellow_tripdata_2019-09.parquet, skipping...
File already exists: yellow_tripdata_2019-10.parquet, skipping...
File already exists: yellow_tripdata_2019-11.parquet, skipping...
File already exists: yellow_tripdata_2019-12.parquet, skipping...
File already exists: yellow_tripdata_2020-01.parquet, skipping...
File already exists: yellow_tripdata_2020-02.parquet, skipping...
File already exists: yellow_tripdata_2020-03.parquet, skipping...
File already exists: yellow_tripdata_2020-04.parquet, skipping...
File already exists: yellow_tripdata_2020-05.parquet, skipping...
File already exists: yellow_tripdata_2020-06.parquet, skipping...
File already exists: yellow_tripdata_2020-07.parquet, skipping...
File already exists: yellow_tripdata_2020-08.parquet, skipping...
File already exists: yellow_tripdata_2020-09.parquet, skipping...
File already exists: yellow_tripdata_2020-10.parquet, skipping...
File already exists: yellow_tripdata_2020-11.parquet, skipping...
File already exists: yellow_tripdata_2020-12.parquet, skipping...
File already exists: yellow_tripdata_2021-01.parquet, skipping...
File already exists: yellow_tripdata_2021-02.parquet, skipping...
File already exists: yellow_tripdata_2021-03.parquet, skipping...
File already exists: yellow_tripdata_2021-04.parquet, skipping...
File already exists: yellow_tripdata_2021-05.parquet, skipping...
File already exists: yellow_tripdata_2021-06.parquet, skipping...
File already exists: yellow_tripdata_2021-07.parquet, skipping...
File already exists: yellow_tripdata_2021-08.parquet, skipping...
File already exists: yellow_tripdata_2021-09.parquet, skipping...
File already exists: yellow_tripdata_2021-10.parquet, skipping...
File already exists: yellow_tripdata_2021-11.parquet, skipping...
File already exists: yellow_tripdata_2021-12.parquet, skipping...
File already exists: yellow_tripdata_2022-01.parquet, skipping...
File already exists: yellow_tripdata_2022-02.parquet, skipping...
File already exists: yellow_tripdata_2022-03.parquet, skipping...
File already exists: yellow_tripdata_2022-04.parquet, skipping...
File already exists: yellow_tripdata_2022-05.parquet, skipping...
File already exists: yellow_tripdata_2022-06.parquet, skipping...
File already exists: yellow_tripdata_2022-07.parquet, skipping...
File already exists: yellow_tripdata_2022-08.parquet, skipping...
File already exists: yellow_tripdata_2022-09.parquet, skipping...
File already exists: yellow_tripdata_2022-10.parquet, skipping...
File already exists: yellow_tripdata_2022-11.parquet, skipping...
File already exists: yellow_tripdata_2022-12.parquet, skipping...
File already exists: yellow_tripdata_2023-01.parquet, skipping...
File already exists: yellow_tripdata_2023-02.parquet, skipping...
File already exists: yellow_tripdata_2023-03.parquet, skipping...
File already exists: yellow_tripdata_2023-04.parquet, skipping...
File already exists: yellow_tripdata_2023-05.parquet, skipping...
File already exists: yellow_tripdata_2023-06.parquet, skipping...
File already exists: yellow_tripdata_2023-07.parquet, skipping...
File already exists: yellow_tripdata_2023-08.parquet, skipping...
File already exists: yellow_tripdata_2023-09.parquet, skipping...
File already exists: yellow_tripdata_2023-10.parquet, skipping...
File already exists: yellow_tripdata_2023-11.parquet, skipping...
File already exists: yellow_tripdata_2023-12.parquet, skipping...
File already exists: yellow_tripdata_2024-01.parquet, skipping...
File already exists: yellow_tripdata_2024-02.parquet, skipping...
File already exists: yellow_tripdata_2024-03.parquet, skipping...
File already exists: yellow_tripdata_2024-04.parquet, skipping...
File already exists: yellow_tripdata_2024-05.parquet, skipping...
File already exists: yellow_tripdata_2024-06.parquet, skipping...
File already exists: yellow_tripdata_2014-09.parquet, skipping...
File already exists: yellow_tripdata_2024-08.parquet, skipping...
File already exists: yellow_tripdata_2024-09.parquet, skipping...
File already exists: yellow_tripdata_2024-10.parquet, skipping...
File already exists: yellow_tripdata_2024-11.parquet, skipping...
File already exists: yellow_tripdata_2024-12.parquet, skipping...
File already exists: yellow_tripdata_2024-07.parquet, skipping...
File already exists: yellow_tripdata_2015-11.parquet, skipping...
File already exists: yellow_tripdata_2017-03.parquet, skipping...

In [17]:
# Print summary
elapsed_time = time.time() - start_time
print("\n" + "="*50)
print("Download Summary:")
print(f"Elapsed time: {elapsed_time:.2f} seconds ({elapsed_time/60:.2f} minutes)")
print(f"Years processed: {years}")
print(f"Months processed: {', '.join([str(m) for m in months])}")
print(f"Files successfully downloaded: {results['successful']}")
print(f"Files skipped (already exist): {results['skipped']}")
print(f"Files failed to download: {results['failed']}")
print(f"Total data size: {results['total_size_mb']:.2f} MB ({results['total_size_mb']/1024:.2f} GB)")
print(f"Data saved to: {os.path.abspath(output_dir)}")
print("="*50)
==================================================
Download Summary:
Elapsed time: 0.10 seconds (0.00 minutes)
Years processed: [2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
Months processed: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
Files successfully downloaded: 0
Files skipped (already exist): 132
Files failed to download: 0
Total data size: 12501.75 MB (12.21 GB)
Data saved to: /home/rvasappanavara/rvasappanavara/shared/nyc_taxi_data
==================================================
In [18]:
import os
import time
import json
import psutil
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from datetime import datetime
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

Set Up Spark Environment¶

In [19]:
# Import necessary Spark libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Get system memory
import psutil
mem = psutil.virtual_memory()
total_mem_gb = mem.total / (1024**3)
cores = psutil.cpu_count()

# Data size from previous cell (if not available, set a default)
try:
    print(f"Configuring for {data_size_gb:.2f} GB of data")
except NameError:
    # Default estimate if data_size_gb is not defined from previous cell
    data_size_gb = 10
    print(f"Using default data size estimate of {data_size_gb} GB")

# Calculate optimal memory allocation
reserved_mem_gb = max(4, total_mem_gb * 0.2)  # Reserve memory for OS
available_mem_gb = total_mem_gb - reserved_mem_gb
driver_mem_gb = min(max(4, available_mem_gb * 0.25), 16)  # Max 16GB for driver
executor_mem_gb = max(4, available_mem_gb * 0.6)  # 60% of available memory

# Calculate optimal parallelism
shuffle_partitions = min(max(200, int(data_size_gb * 10)), 1000)
Using default data size estimate of 10 GB
In [20]:
# Create Spark session with optimized configuration
spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .config("spark.driver.memory", f"{int(driver_mem_gb)}g") \
    .config("spark.executor.memory", f"{int(executor_mem_gb)}g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.shuffle.partitions", shuffle_partitions) \
    .config("spark.default.parallelism", cores * 2) \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.sql.files.maxPartitionBytes", 128 * 1024 * 1024) \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

# Print configuration summary
print("\nSpark Configuration Summary:")
print(f"Driver Memory: {int(driver_mem_gb)}g")
print(f"Executor Memory: {int(executor_mem_gb)}g")
print(f"Shuffle Partitions: {shuffle_partitions}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")
print(f"Available Cores: {cores}")
print(f"System Memory: {total_mem_gb:.1f} GB")

# Test the connection
print("\nTesting Spark session...")
test_df = spark.range(1000).toDF("id")
count = test_df.count()
print(f"Test successful. Created DataFrame with {count} records.")
Spark Configuration Summary:
Driver Memory: 16g
Executor Memory: 120g
Shuffle Partitions: 200
Default Parallelism: 256
Available Cores: 128
System Memory: 251.5 GB

Testing Spark session...
Test successful. Created DataFrame with 1000 records.
In [21]:
# Import PySpark functions with clear aliasing
from pyspark.sql.functions import (
    hour as hour_func,
    dayofweek as dayofweek_func,
    month as month_func,
    year as year_func,
    to_date as to_date_func,
    when as when_func,
    col as col_func,  # Now col is renamed to col_func
    unix_timestamp as unix_timestamp_func,
    avg as avg_func
)
# Import other required libraries
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import os

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")
print("Imports completed successfully")
Imports completed successfully

Loading and Exploring the NYC Taxi Data¶

In [22]:
# Path to the downloaded data
data_path = "nyc_taxi_data/"  # Adjust this path to where you stored the data
year = 2022
month = 1  # January

# Construct the file path
file_path = f"{data_path}yellow_tripdata_{year}-{month:02d}.parquet"

# Check if the file exists
if os.path.exists(file_path):
    print(f"File found: {file_path}")
else:
    print(f"File not found: {file_path}")
    print(f"Current working directory: {os.getcwd()}")
    print("Please check the file path and try again.")

# Load the Parquet file into a Spark DataFrame
try:
    taxi_df = spark.read.parquet(file_path)
    print(f"Successfully loaded data with {taxi_df.count()} records and {len(taxi_df.columns)} columns")
except Exception as e:
    print(f"Error loading data: {str(e)}")
    
# Display schema
print("\nDataset Schema:")
taxi_df.printSchema()

# Show a sample of the data
print("\nSample Data:")
taxi_df.show(5)
File found: nyc_taxi_data/yellow_tripdata_2022-01.parquet
Successfully loaded data with 2463931 records and 19 columns

Dataset Schema:
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)


Sample Data:
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-01-01 00:35:40|  2022-01-01 00:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.65|         0.0|                  0.3|       21.95|                 2.5|        0.0|
|       1| 2022-01-01 00:33:43|  2022-01-01 00:42:07|            1.0|          2.1|       1.0|                 N|         236|          42|           1|        8.0|  0.5|    0.5|       4.0|         0.0|                  0.3|        13.3|                 0.0|        0.0|
|       2| 2022-01-01 00:53:21|  2022-01-01 01:02:19|            1.0|         0.97|       1.0|                 N|         166|         166|           1|        7.5|  0.5|    0.5|      1.76|         0.0|                  0.3|       10.56|                 0.0|        0.0|
|       2| 2022-01-01 00:25:21|  2022-01-01 00:35:23|            1.0|         1.09|       1.0|                 N|         114|          68|           2|        8.0|  0.5|    0.5|       0.0|         0.0|                  0.3|        11.8|                 2.5|        0.0|
|       2| 2022-01-01 00:36:48|  2022-01-01 01:14:20|            1.0|          4.3|       1.0|                 N|          68|         163|           1|       23.5|  0.5|    0.5|       3.0|         0.0|                  0.3|        30.3|                 2.5|        0.0|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
only showing top 5 rows

Understand the Data Structure¶

In [23]:
# Path to the downloaded data
data_path = "nyc_taxi_data/"  # Adjust this path to where you stored the data
year = 2022
month = 1  # January

# Construct the file path
file_path = f"{data_path}yellow_tripdata_{year}-{month:02d}.parquet"

# Check if the file exists
if os.path.exists(file_path):
    print(f"File found: {file_path}")
else:
    print(f"File not found: {file_path}")
    print(f"Current working directory: {os.getcwd()}")
    print("Please check the file path and try again.")

# Load the Parquet file into a Spark DataFrame
try:
    taxi_df = spark.read.parquet(file_path)
    print(f"Successfully loaded data with {taxi_df.count()} records and {len(taxi_df.columns)} columns")
except Exception as e:
    print(f"Error loading data: {str(e)}")
    
# Display schema
print("\nDataset Schema:")
taxi_df.printSchema()

# Show a sample of the data
print("\nSample Data:")
taxi_df.show(5)
File found: nyc_taxi_data/yellow_tripdata_2022-01.parquet
Successfully loaded data with 2463931 records and 19 columns

Dataset Schema:
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)


Sample Data:
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-01-01 00:35:40|  2022-01-01 00:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.65|         0.0|                  0.3|       21.95|                 2.5|        0.0|
|       1| 2022-01-01 00:33:43|  2022-01-01 00:42:07|            1.0|          2.1|       1.0|                 N|         236|          42|           1|        8.0|  0.5|    0.5|       4.0|         0.0|                  0.3|        13.3|                 0.0|        0.0|
|       2| 2022-01-01 00:53:21|  2022-01-01 01:02:19|            1.0|         0.97|       1.0|                 N|         166|         166|           1|        7.5|  0.5|    0.5|      1.76|         0.0|                  0.3|       10.56|                 0.0|        0.0|
|       2| 2022-01-01 00:25:21|  2022-01-01 00:35:23|            1.0|         1.09|       1.0|                 N|         114|          68|           2|        8.0|  0.5|    0.5|       0.0|         0.0|                  0.3|        11.8|                 2.5|        0.0|
|       2| 2022-01-01 00:36:48|  2022-01-01 01:14:20|            1.0|          4.3|       1.0|                 N|          68|         163|           1|       23.5|  0.5|    0.5|       3.0|         0.0|                  0.3|        30.3|                 2.5|        0.0|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
only showing top 5 rows

Creating Features for EDA¶

In [24]:
# Add temporal features
enhanced_df = taxi_df.withColumn("pickup_hour", hour_func("tpep_pickup_datetime")) \
    .withColumn("pickup_day", dayofweek_func("tpep_pickup_datetime")) \
    .withColumn("pickup_month", month_func("tpep_pickup_datetime")) \
    .withColumn("pickup_year", year_func("tpep_pickup_datetime")) \
    .withColumn("pickup_date", to_date_func("tpep_pickup_datetime")) \
    .withColumn("is_weekend", when_func(dayofweek_func("tpep_pickup_datetime").isin([1, 7]), True).otherwise(False)) \
    .withColumn("trip_duration_minutes", 
                (unix_timestamp_func("tpep_dropoff_datetime") - unix_timestamp_func("tpep_pickup_datetime")) / 60) \
    .withColumn("time_of_day", 
               when_func((hour_func("tpep_pickup_datetime") >= 6) & (hour_func("tpep_pickup_datetime") < 12), "Morning")
               .when((hour_func("tpep_pickup_datetime") >= 12) & (hour_func("tpep_pickup_datetime") < 18), "Afternoon")
               .when((hour_func("tpep_pickup_datetime") >= 18) & (hour_func("tpep_pickup_datetime") < 22), "Evening")
               .otherwise("Night")) \
    .withColumn("speed_mph", when_func(col_func("trip_duration_minutes") > 0, 
                                 col_func("trip_distance") / (col_func("trip_duration_minutes") / 60)).otherwise(0)) \
    .withColumn("tip_percentage", when_func(col_func("fare_amount") > 0, 
                                      (col_func("tip_amount") / col_func("fare_amount")) * 100).otherwise(0))

# Clean the data by filtering out extreme values
cleaned_df = enhanced_df.filter(
    (col_func("trip_distance") > 0) & (col_func("trip_distance") < 100) &
    (col_func("fare_amount") > 0) & (col_func("fare_amount") < 1000) &
    (col_func("passenger_count") > 0) & (col_func("passenger_count") < 10) &
    (col_func("trip_duration_minutes") > 0) & (col_func("trip_duration_minutes") < 180) &
    (col_func("speed_mph") < 100)  # Reasonable speed limit
)

# Check data size after cleaning
original_count = taxi_df.count()
cleaned_count = cleaned_df.count()
removed_percentage = ((original_count - cleaned_count) / original_count) * 100

print(f"Original dataset: {original_count} records")
print(f"Cleaned dataset: {cleaned_count} records")
print(f"Removed {original_count - cleaned_count} records ({removed_percentage:.2f}%)")

# Show the enhanced dataset
print("\nEnhanced Dataset Sample:")
cleaned_df.select("tpep_pickup_datetime", "pickup_hour", "time_of_day", "is_weekend", 
                 "trip_distance", "trip_duration_minutes", "speed_mph", 
                 "fare_amount", "tip_amount", "tip_percentage").show(5)
Original dataset: 2463931 records
Cleaned dataset: 2297632 records
Removed 166299 records (6.75%)

Enhanced Dataset Sample:
+--------------------+-----------+-----------+----------+-------------+---------------------+------------------+-----------+----------+------------------+
|tpep_pickup_datetime|pickup_hour|time_of_day|is_weekend|trip_distance|trip_duration_minutes|         speed_mph|fare_amount|tip_amount|    tip_percentage|
+--------------------+-----------+-----------+----------+-------------+---------------------+------------------+-----------+----------+------------------+
| 2022-01-01 00:35:40|          0|      Night|      true|          3.8|   17.816666666666666|12.797006548175863|       14.5|      3.65| 25.17241379310345|
| 2022-01-01 00:33:43|          0|      Night|      true|          2.1|                  8.4|              15.0|        8.0|       4.0|              50.0|
| 2022-01-01 00:53:21|          0|      Night|      true|         0.97|    8.966666666666667|6.4907063197026025|        7.5|      1.76|23.466666666666665|
| 2022-01-01 00:25:21|          0|      Night|      true|         1.09|   10.033333333333333|  6.51827242524917|        8.0|       0.0|               0.0|
| 2022-01-01 00:36:48|          0|      Night|      true|          4.3|    37.53333333333333| 6.873889875666075|       23.5|       3.0| 12.76595744680851|
+--------------------+-----------+-----------+----------+-------------+---------------------+------------------+-----------+----------+------------------+
only showing top 5 rows

Basic Statistics and Data Quality Insights¶

In [25]:
# Check data size after cleaning
original_count = taxi_df.count()
enhanced_count = enhanced_df.count()
cleaned_count = cleaned_df.count()

print(f"Original dataset: {original_count} records")
print(f"Enhanced dataset: {enhanced_count} records")
print(f"Cleaned dataset: {cleaned_count} records")
print(f"Removed {original_count - cleaned_count} records ({(original_count - cleaned_count) / original_count * 100:.2f}%)")

# Basic statistics for key numeric columns
numeric_columns = ["trip_distance", "fare_amount", "tip_amount", "trip_duration_minutes", 
                   "speed_mph", "tip_percentage", "passenger_count"]

print("\nBasic Statistics for Key Metrics:")
cleaned_df.select(numeric_columns).summary("count", "min", "25%", "mean", "50%", "75%", "max").show()
Original dataset: 2463931 records
Enhanced dataset: 2463931 records
Cleaned dataset: 2297632 records
Removed 166299 records (6.75%)

Basic Statistics for Key Metrics:
+-------+------------------+------------------+------------------+---------------------+--------------------+------------------+------------------+
|summary|     trip_distance|       fare_amount|        tip_amount|trip_duration_minutes|           speed_mph|    tip_percentage|   passenger_count|
+-------+------------------+------------------+------------------+---------------------+--------------------+------------------+------------------+
|  count|           2297632|           2297632|           2297632|              2297632|             2297632|           2297632|           2297632|
|    min|              0.01|              0.01|               0.0| 0.016666666666666666|0.003899057727715802|               0.0|               1.0|
|    25%|              1.07|               6.5|               1.0|    6.383333333333334|   8.756756756756756|7.6923076923076925|               1.0|
|   mean|3.1341296430411294|12.627766853001917|2.3830494091309107|   12.729442327289009|  12.975645449900256|21.710340636464906|1.4223091426303254|
|    50%|              1.75|               9.0|               2.0|   10.166666666666666|  11.095081967213115|24.923076923076927|               1.0|
|    75%|              3.11|              13.5|               3.0|                16.05|   14.73899692937564|             30.75|               1.0|
|    max|             99.46|             495.0|            888.88|               179.65|   99.31034482758622|          630000.0|               9.0|
+-------+------------------+------------------+------------------+---------------------+--------------------+------------------+------------------+

Register a Temporary View for SQL-Based Analysis¶

In [26]:
# Register the dataframe as a temporary view for SQL analysis
cleaned_df.createOrReplaceTempView("nyc_taxi")

# Basic count by time of day
print("\nTrip Distribution by Time of Day:")
spark.sql("""
    SELECT 
        time_of_day,
        COUNT(*) as trip_count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM nyc_taxi), 2) as percentage
    FROM 
        nyc_taxi
    GROUP BY 
        time_of_day
    ORDER BY 
        CASE 
            WHEN time_of_day = 'Morning' THEN 1
            WHEN time_of_day = 'Afternoon' THEN 2
            WHEN time_of_day = 'Evening' THEN 3
            WHEN time_of_day = 'Night' THEN 4
        END
""").show()
Trip Distribution by Time of Day:
+-----------+----------+----------+
|time_of_day|trip_count|percentage|
+-----------+----------+----------+
|    Morning|    529236|     23.03|
|  Afternoon|    912874|     39.73|
|    Evening|    522296|     22.73|
|      Night|    333226|     14.50|
+-----------+----------+----------+

Analyze Trip Patterns by Hour, Day, and Weekend vs. Weekday¶

In [27]:
# Hourly patterns
print("\nHourly Trip Statistics:")
spark.sql("""
    SELECT 
        pickup_hour,
        COUNT(*) as trip_count,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(tip_amount), 2) as avg_tip,
        ROUND(AVG(trip_distance), 2) as avg_distance,
        ROUND(AVG(trip_duration_minutes), 2) as avg_duration,
        ROUND(AVG(tip_percentage), 2) as avg_tip_percentage
    FROM 
        nyc_taxi
    GROUP BY 
        pickup_hour
    ORDER BY 
        pickup_hour
""").show(24)

# Day of week patterns
print("\nDay of Week Trip Statistics:")
spark.sql("""
    SELECT 
        pickup_day,
        CASE pickup_day
            WHEN 1 THEN 'Sunday'
            WHEN 2 THEN 'Monday'
            WHEN 3 THEN 'Tuesday'
            WHEN 4 THEN 'Wednesday'
            WHEN 5 THEN 'Thursday'
            WHEN 6 THEN 'Friday'
            WHEN 7 THEN 'Saturday'
        END as day_name,
        COUNT(*) as trip_count,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(tip_amount), 2) as avg_tip,
        ROUND(AVG(trip_distance), 2) as avg_distance
    FROM 
        nyc_taxi
    GROUP BY 
        pickup_day
    ORDER BY 
        pickup_day
""").show()

# Weekend vs. weekday comparison
print("\nWeekend vs. Weekday Comparison:")
spark.sql("""
    SELECT 
        CASE WHEN is_weekend = true THEN 'Weekend' ELSE 'Weekday' END as day_type,
        COUNT(*) as trip_count,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(tip_amount), 2) as avg_tip,
        ROUND(AVG(trip_distance), 2) as avg_distance,
        ROUND(AVG(trip_duration_minutes), 2) as avg_duration,
        ROUND(AVG(speed_mph), 2) as avg_speed,
        ROUND(AVG(tip_percentage), 2) as avg_tip_percentage
    FROM 
        nyc_taxi
    GROUP BY 
        is_weekend
""").show()
Hourly Trip Statistics:
+-----------+----------+--------+-------+------------+------------+------------------+
|pickup_hour|trip_count|avg_fare|avg_tip|avg_distance|avg_duration|avg_tip_percentage|
+-----------+----------+--------+-------+------------+------------+------------------+
|          0|     55070|   14.11|   2.61|        3.95|       12.19|             21.48|
|          1|     38511|   13.14|   2.46|        3.55|       11.61|              22.1|
|          2|     26338|   12.43|   2.37|        3.28|       11.26|             22.63|
|          3|     17360|   13.23|    2.3|        3.55|        11.7|             21.03|
|          4|     11107|   15.22|   2.39|        4.25|       12.52|             19.13|
|          5|     12719|   18.11|   2.65|        5.38|       13.41|             17.31|
|          6|     32597|   15.13|   2.46|        4.33|       12.96|             19.27|
|          7|     67359|   12.73|   2.29|        3.23|       12.68|             20.45|
|          8|     93472|   12.25|   2.27|        2.86|       13.04|             20.99|
|          9|    102185|   11.94|    2.2|        2.76|       12.57|             20.72|
|         10|    112231|   12.01|   2.16|        2.79|       12.57|             20.36|
|         11|    121392|    11.7|   2.15|        2.66|       12.48|             20.38|
|         12|    133163|    11.8|   2.17|        2.71|       12.47|             20.43|
|         13|    138524|   12.34|   2.24|        2.93|       12.84|             24.85|
|         14|    153809|   12.86|   2.33|        3.07|       13.67|             20.21|
|         15|    163647|   12.93|   2.37|        3.12|       14.05|              21.9|
|         16|    158017|   12.84|   2.46|        3.07|       13.72|              21.7|
|         17|    165714|   12.48|   2.47|         3.0|       13.38|             22.43|
|         18|    166125|   11.89|   2.41|        2.88|       12.38|              22.8|
|         19|    141987|   12.18|   2.47|        3.13|       11.98|             22.88|
|         20|    110877|    12.8|   2.53|        3.37|       11.88|             22.29|
|         21|    103307|   12.98|   2.61|        3.48|       11.83|             22.62|
|         22|     97946|   13.44|   2.64|        3.66|       12.11|             22.13|
|         23|     74175|   13.99|   2.63|        3.91|       12.18|             21.54|
+-----------+----------+--------+-------+------------+------------+------------------+


Day of Week Trip Statistics:
+----------+---------+----------+--------+-------+------------+
|pickup_day| day_name|trip_count|avg_fare|avg_tip|avg_distance|
+----------+---------+----------+--------+-------+------------+
|         1|   Sunday|    318459|   13.68|   2.55|        3.66|
|         2|   Monday|    348715|   13.26|   2.45|        3.41|
|         3|  Tuesday|    304095|   12.39|   2.32|        3.04|
|         4|Wednesday|    317270|   12.14|    2.3|        2.91|
|         5| Thursday|    333271|   12.27|   2.34|        2.91|
|         6|   Friday|    337075|   12.29|   2.37|        2.91|
|         7| Saturday|    338747|   12.34|   2.36|         3.1|
+----------+---------+----------+--------+-------+------------+


Weekend vs. Weekday Comparison:
+--------+----------+--------+-------+------------+------------+---------+------------------+
|day_type|trip_count|avg_fare|avg_tip|avg_distance|avg_duration|avg_speed|avg_tip_percentage|
+--------+----------+--------+-------+------------+------------+---------+------------------+
| Weekend|    657206|   12.99|   2.45|        3.37|       12.23|    14.26|             21.32|
| Weekday|   1640426|   12.48|   2.36|        3.04|       12.93|    12.46|             21.87|
+--------+----------+--------+-------+------------+------------+---------+------------------+

Analyze Fares and Tips¶

In [28]:
# Analyze tip percentage distribution
print("\nTip Percentage Distribution:")
spark.sql("""
    SELECT 
        CASE 
            WHEN tip_percentage = 0 THEN 'No Tip'
            WHEN tip_percentage <= 10 THEN '1-10%'
            WHEN tip_percentage <= 15 THEN '11-15%'
            WHEN tip_percentage <= 20 THEN '16-20%'
            WHEN tip_percentage <= 25 THEN '21-25%'
            ELSE 'Over 25%'
        END as tip_bracket,
        COUNT(*) as trip_count,
        ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM nyc_taxi), 2) as percentage
    FROM 
        nyc_taxi
    GROUP BY 
        CASE 
            WHEN tip_percentage = 0 THEN 'No Tip'
            WHEN tip_percentage <= 10 THEN '1-10%'
            WHEN tip_percentage <= 15 THEN '11-15%'
            WHEN tip_percentage <= 20 THEN '16-20%'
            WHEN tip_percentage <= 25 THEN '21-25%'
            ELSE 'Over 25%'
        END
    ORDER BY 
        CASE tip_bracket
            WHEN 'No Tip' THEN 1
            WHEN '1-10%' THEN 2
            WHEN '11-15%' THEN 3
            WHEN '16-20%' THEN 4
            WHEN '21-25%' THEN 5
            ELSE 6
        END
""").show()

# Analyze correlation between distance and fare
print("\nCorrelation between Trip Distance and Fare Amount:")
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# Create vector column for correlation analysis
assembler = VectorAssembler(inputCols=["trip_distance", "fare_amount"], outputCol="features")
vector_df = assembler.transform(cleaned_df)

# Calculate correlation
corr_matrix = Correlation.corr(vector_df, "features").collect()[0][0]
print(f"Pearson correlation coefficient: {corr_matrix[0, 1]}")
Tip Percentage Distribution:
+-----------+----------+----------+
|tip_bracket|trip_count|percentage|
+-----------+----------+----------+
|     No Tip|    545479|     23.74|
|      1-10%|     64465|      2.81|
|     11-15%|    118369|      5.15|
|     16-20%|    148360|      6.46|
|     21-25%|    292198|     12.72|
|   Over 25%|   1128761|     49.13|
+-----------+----------+----------+


Correlation between Trip Distance and Fare Amount:
Pearson correlation coefficient: 0.9513520816567353
In [29]:
# Set up the visualization environment
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = [12, 6]

# Sample data for visualization - Fixed the min() function issue
# In PySpark, min() is used differently than in Python
sample_size = 10000
total_count = cleaned_df.count()
sample_ratio = 0.1 if total_count > 100000 else sample_size / float(total_count)

# Get a sample of the data for visualization
sample_df = cleaned_df.sample(withReplacement=False, fraction=sample_ratio, seed=42).toPandas()
print(f"Sampled {len(sample_df)} records out of {total_count} for visualization")
Sampled 230301 records out of 2297632 for visualization

Trip Count by Hour¶

In [30]:
# 1. Trip Count by Hour
hourly_counts = cleaned_df.groupBy("pickup_hour").count().orderBy("pickup_hour").toPandas()

plt.figure(figsize=(10, 5))
plt.bar(hourly_counts["pickup_hour"], hourly_counts["count"], color='skyblue')
plt.title("Number of Taxi Trips by Hour of Day", fontsize=14)
plt.xlabel("Hour of Day", fontsize=12)
plt.ylabel("Number of Trips", fontsize=12)
plt.xticks(range(0, 24))
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
No description has been provided for this image

Average Fare by Hour¶

In [31]:
# 2. Average Fare by Hour
hourly_fares = cleaned_df.groupBy("pickup_hour") \
    .agg(F.avg("fare_amount").alias("avg_fare")) \
    .orderBy("pickup_hour") \
    .toPandas()

plt.figure(figsize=(10, 5))
plt.plot(hourly_fares["pickup_hour"], hourly_fares["avg_fare"], 'o-', color='orange', linewidth=2)
plt.title("Average Fare by Hour of Day", fontsize=14)
plt.xlabel("Hour of Day", fontsize=12)
plt.ylabel("Average Fare ($)", fontsize=12)
plt.xticks(range(0, 24))
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
No description has been provided for this image

Trip Distance Distribution¶

In [32]:
# 3. Trip Distance Distribution
plt.figure(figsize=(10, 5))
plt.hist(sample_df["trip_distance"], bins=30, alpha=0.7, color='green')
plt.title("Distribution of Trip Distances", fontsize=14)
plt.xlabel("Trip Distance (miles)", fontsize=12)
plt.ylabel("Frequency", fontsize=12)
plt.xlim(0, 20)  # Focus on most common distances
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
No description has been provided for this image

Trip Distance vs. Fare Amount (Scatter Plot)¶

In [33]:
# 4. Trip Distance vs. Fare Amount Scatter Plot
plt.figure(figsize=(10, 5))
plt.scatter(sample_df["trip_distance"], sample_df["fare_amount"], alpha=0.5, color='purple', s=10)
plt.title("Relationship Between Trip Distance and Fare Amount", fontsize=14)
plt.xlabel("Trip Distance (miles)", fontsize=12)
plt.ylabel("Fare Amount ($)", fontsize=12)
plt.xlim(0, 20)
plt.ylim(0, 100)
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
No description has been provided for this image

Trip Count vs Tip Percentage¶

In [34]:
# First, calculate the data needed for the plots
# 1. Trip Count by Hour
hourly_counts = cleaned_df.groupBy("pickup_hour").count().orderBy("pickup_hour").toPandas()

# 2. Average Tip Percentage by Hour
hourly_tips = cleaned_df.groupBy("pickup_hour") \
    .agg(F.avg("tip_percentage").alias("avg_tip_pct")) \
    .orderBy("pickup_hour") \
    .toPandas()

# Now create the side-by-side plot
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 5))

# Left plot - Trip Count
ax1.bar(hourly_counts["pickup_hour"], hourly_counts["count"], color='skyblue')
ax1.set_title("Trip Count by Hour", fontsize=14)
ax1.set_xlabel("Hour of Day", fontsize=12)
ax1.set_ylabel("Number of Trips", fontsize=12)
ax1.set_xticks(range(0, 24))
ax1.grid(axis='y', linestyle='--', alpha=0.7)

# Right plot - Tip Percentage
ax2.plot(hourly_tips["pickup_hour"], hourly_tips["avg_tip_pct"], 'o-', color='red', linewidth=2)
ax2.set_title("Average Tip Percentage by Hour", fontsize=14)
ax2.set_xlabel("Hour of Day", fontsize=12)
ax2.set_ylabel("Tip Percentage (%)", fontsize=12)
ax2.set_xticks(range(0, 24))
ax2.grid(True, linestyle='--', alpha=0.7)

plt.tight_layout()
plt.show()
No description has been provided for this image

Distribution of Key Metrics¶

In [35]:
# 7. Multi-panel distribution plots
fig, axs = plt.subplots(2, 2, figsize=(16, 10))

# Trip Distance
sns.histplot(sample_df["trip_distance"], bins=30, kde=True, ax=axs[0, 0], color='blue')
axs[0, 0].set_title('Trip Distance Distribution')
axs[0, 0].set_xlim(0, 20)

# Fare Amount
sns.histplot(sample_df["fare_amount"], bins=30, kde=True, ax=axs[0, 1], color='green')
axs[0, 1].set_title('Fare Amount Distribution')
axs[0, 1].set_xlim(0, 80)

# Trip Duration
sns.histplot(sample_df["trip_duration_minutes"], bins=30, kde=True, ax=axs[1, 0], color='orange')
axs[1, 0].set_title('Trip Duration Distribution (minutes)')
axs[1, 0].set_xlim(0, 60)

# Tip Percentage
sns.histplot(sample_df["tip_percentage"], bins=30, kde=True, ax=axs[1, 1], color='red')
axs[1, 1].set_title('Tip Percentage Distribution')
axs[1, 1].set_xlim(0, 30)

plt.tight_layout()
plt.show()
No description has been provided for this image

Weekday vs. Weekend Comparison¶

In [36]:
# 8. Weekday vs Weekend comparison
day_type_stats = cleaned_df.groupBy("is_weekend") \
    .agg(
        F.count("*").alias("trip_count"),
        F.avg("fare_amount").alias("avg_fare"),
        F.avg("tip_amount").alias("avg_tip"),
        F.avg("trip_distance").alias("avg_distance"),
        F.avg("trip_duration_minutes").alias("avg_duration")
    ) \
    .toPandas()

# Convert boolean to string for better labels
day_type_stats["day_type"] = day_type_stats["is_weekend"].apply(lambda x: "Weekend" if x else "Weekday")

# Set up the figure
fig, axs = plt.subplots(2, 2, figsize=(16, 10))

# Average Fare
axs[0, 0].bar(day_type_stats["day_type"], day_type_stats["avg_fare"], color=['blue', 'red'])
axs[0, 0].set_title('Average Fare by Day Type', fontsize=14)
axs[0, 0].set_ylabel('Average Fare ($)', fontsize=12)
axs[0, 0].grid(axis='y', linestyle='--', alpha=0.7)

# Average Tip
axs[0, 1].bar(day_type_stats["day_type"], day_type_stats["avg_tip"], color=['blue', 'red'])
axs[0, 1].set_title('Average Tip by Day Type', fontsize=14)
axs[0, 1].set_ylabel('Average Tip ($)', fontsize=12)
axs[0, 1].grid(axis='y', linestyle='--', alpha=0.7)

# Average Distance
axs[1, 0].bar(day_type_stats["day_type"], day_type_stats["avg_distance"], color=['blue', 'red'])
axs[1, 0].set_title('Average Distance by Day Type', fontsize=14)
axs[1, 0].set_ylabel('Average Distance (miles)', fontsize=12)
axs[1, 0].grid(axis='y', linestyle='--', alpha=0.7)

# Average Duration
axs[1, 1].bar(day_type_stats["day_type"], day_type_stats["avg_duration"], color=['blue', 'red'])
axs[1, 1].set_title('Average Duration by Day Type', fontsize=14)
axs[1, 1].set_ylabel('Average Duration (minutes)', fontsize=12)
axs[1, 1].grid(axis='y', linestyle='--', alpha=0.7)

plt.tight_layout()
plt.show()
No description has been provided for this image

Correlation Matrix Heatmap¶

In [37]:
# 9. Correlation matrix heatmap
# Select numeric columns for correlation analysis
numeric_cols = ["trip_distance", "fare_amount", "tip_amount", "trip_duration_minutes", 
               "speed_mph", "tip_percentage", "passenger_count"]

# Calculate correlations using pandas
corr_df = sample_df[numeric_cols].corr()

# Create a heatmap
plt.figure(figsize=(12, 10))
sns.heatmap(corr_df, annot=True, cmap='coolwarm', vmin=-1, vmax=1, linewidths=0.5)
plt.title('Correlation Matrix of Numeric Features', fontsize=16)
plt.tight_layout()
plt.show()
No description has been provided for this image

🔍 Correlation Analysis of Numeric Features¶

This heatmap represents the correlation matrix of numeric features in the NYC Taxi dataset.
Each cell shows the Pearson correlation coefficient between two variables, ranging from -1 (perfect negative correlation) to +1 (perfect positive correlation).


✅ Strong Positive Correlations¶

Feature Pair Correlation Interpretation
trip_distance & fare_amount 0.95 Longer trips tend to cost more — as expected.
trip_duration_minutes & fare_amount 0.85 Longer durations generally result in higher fares.
trip_distance & trip_duration_minutes 0.80 Distance and duration increase together (logical).
trip_distance & speed_mph 0.75 Longer trips are often on faster roads (e.g., highways).
fare_amount & speed_mph 0.66 Higher fares often correlate with faster routes (possibly longer distances).

⚠️ Moderate Correlations¶

Feature Pair Correlation Interpretation
fare_amount & tip_amount 0.55 Higher fares tend to result in larger tips.
trip_distance & tip_amount 0.55 Tip amount also scales with distance to some degree.
tip_amount & tip_percentage 0.51 Amounts scale together, but percentage varies due to outliers or rounding.

🚫 Negligible or Weak Correlations¶

Feature Pair Correlation Interpretation
tip_percentage & trip_distance -0.12 Tip % is weakly negatively correlated — riders may tip less on long trips.
tip_percentage & fare_amount -0.14 Slight negative trend — larger fares may lead to lower relative tipping.
passenger_count & all variables ~0.00 Passenger count has no meaningful correlation — may reflect noisy input.

🧠 Final Interpretation of Correlation Matrix¶

The correlation matrix reveals clear patterns that can inform both domain understanding and predictive modeling:


🔑 Fare Prediction Is Highly Feasible¶

  • Fare amount has very strong positive correlations with trip distance (0.95) and trip duration (0.85).
  • These strong relationships validate the use of regression-based models (linear, tree-based, or ensemble) for fare prediction.

💸 Tip Amount vs. Tip Percentage¶

  • Tip amount is moderately correlated with fare and distance, making it predictable to an extent.
  • Tip percentage, however, is weakly or negatively correlated with other features, suggesting it is influenced more by external or behavioral factors (e.g., passenger generosity, service quality, payment method).

🚫 Passenger Count Has Limited Analytical Value¶

  • Passenger count has near-zero correlation with all other variables, indicating it may not be useful for most predictive tasks unless combined with external factors or cleaemogra driver/rider behavior analysis. driver/rider behavior analysis. behavior analysis.
In [38]:
# Import necessary libraries
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
In [39]:
# Get hourly demand data
hourly_demand = cleaned_df.groupBy("pickup_hour") \
    .count() \
    .withColumnRenamed("count", "trip_count") \
    .orderBy("pickup_hour") \
    .toPandas()

# Create time period labels for better visualization
hourly_demand["time_period"] = hourly_demand["pickup_hour"].apply(
    lambda x: "Morning (6-12)" if 6 <= x < 12 else
              "Afternoon (12-18)" if 12 <= x < 18 else
              "Evening (18-22)" if 18 <= x < 22 else
              "Night (22-6)"
)

print(f"Data prepared with {len(hourly_demand)} hourly data points")
hourly_demand.head()
Data prepared with 24 hourly data points
Out[39]:
pickup_hour trip_count time_period
0 0 55070 Night (22-6)
1 1 38511 Night (22-6)
2 2 26338 Night (22-6)
3 3 17360 Night (22-6)
4 4 11107 Night (22-6)
In [40]:
# Create the basic demand by hour visualization
fig = px.bar(
    hourly_demand, 
    x="pickup_hour", 
    y="trip_count",
    title="NYC Taxi Demand by Hour of Day",
    labels={"pickup_hour": "Hour of Day", "trip_count": "Number of Trips"},
    color="time_period",
    color_discrete_map={
        "Morning (6-12)": "#FAA42A",    # Orange
        "Afternoon (12-18)": "#1E88E5",  # Blue
        "Evening (18-22)": "#8E24AA",    # Purple
        "Night (22-6)": "#303F9F"       # Dark blue
    }
)

# Enhance the visualization
fig.update_layout(
    plot_bgcolor='white',
    font=dict(family="Arial", size=12),
    legend_title="Time Period",
    xaxis=dict(
        title="Hour of Day",
        tickmode='linear',
        tick0=0,
        dtick=1,
        gridcolor='lightgray'
    ),
    yaxis=dict(
        title="Number of Trips",
        gridcolor='lightgray'
    ),
    hoverlabel=dict(
        bgcolor="white",
        font_size=12,
        font_family="Arial"
    ),
    width=1000,
    height=600
)

# Add peak demand annotation
peak_hour = hourly_demand.loc[hourly_demand["trip_count"].idxmax()]
fig.add_annotation(
    x=peak_hour["pickup_hour"],
    y=peak_hour["trip_count"],
    text=f"Peak Demand: {peak_hour['pickup_hour']}:00",
    showarrow=True,
    arrowhead=2,
    arrowsize=1,
    arrowwidth=2,
    arrowcolor="#E53935",
    ax=0,
    ay=-40,
    bordercolor="#E53935",
    borderwidth=2,
    borderpad=4,
    bgcolor="white",
    opacity=0.8
)

# Show the plot
fig.show()
In [41]:
# Create a more interactive version with weekday vs weekend comparison
weekday_weekend_demand = cleaned_df.groupBy(["pickup_hour", "is_weekend"]) \
    .count() \
    .withColumnRenamed("count", "trip_count") \
    .orderBy("pickup_hour") \
    .toPandas()

# Convert boolean to string for better display
weekday_weekend_demand["day_type"] = weekday_weekend_demand["is_weekend"].apply(
    lambda x: "Weekend" if x else "Weekday"
)

print(f"Weekday vs Weekend data prepared with {len(weekday_weekend_demand)} data points")
weekday_weekend_demand.head()
Weekday vs Weekend data prepared with 48 data points
Out[41]:
pickup_hour is_weekend trip_count day_type
0 0 False 24650 Weekday
1 0 True 30420 Weekend
2 1 False 12721 Weekday
3 1 True 25790 Weekend
4 2 True 19068 Weekend
In [42]:
# Create comparison figure
fig2 = px.line(
    weekday_weekend_demand,
    x="pickup_hour",
    y="trip_count",
    color="day_type",
    title="NYC Taxi Demand: Weekday vs Weekend by Hour",
    labels={"pickup_hour": "Hour of Day", "trip_count": "Number of Trips", "day_type": "Day Type"},
    color_discrete_map={"Weekday": "#1E88E5", "Weekend": "#E53935"},
    markers=True,
    line_shape="spline"
)

# Enhance the second visualization
fig2.update_layout(
    plot_bgcolor='white',
    font=dict(family="Arial", size=12),
    xaxis=dict(
        title="Hour of Day",
        tickmode='linear',
        tick0=0,
        dtick=1,
        gridcolor='lightgray'
    ),
    yaxis=dict(
        title="Number of Trips",
        gridcolor='lightgray'
    ),
    legend_title="Day Type",
    hoverlabel=dict(
        bgcolor="white",
        font_size=12,
        font_family="Arial"
    ),
    width=1000,
    height=600
)

# Show the plot
fig2.show()

Multi Year Analysis Data Loading¶

In [52]:
# Configure multi-year data loading
data_path = "/home/rvasappanavara/rvasappanavara/shared/nyc_taxi_data/"
start_year = 2014  # Start year
end_year = 2024    # End year
months = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]  # All months

# Create a list to store DataFrames
all_dataframes = []

# Function to read data by year
def load_yearly_data(year, months):
    """Load data for a specific year across all months"""
    yearly_dataframes = []
    
    for month in months:
        file_path = f"{data_path}yellow_tripdata_{year}-{month:02d}.parquet"
        
        if os.path.exists(file_path):
            try:
                df = spark.read.parquet(file_path)
                # Add year and month columns for easier filtering
                df = df.withColumn("data_year", F.lit(year)) \
                       .withColumn("data_month", F.lit(month))
                yearly_dataframes.append(df)
                print(f"✓ Loaded {year}-{month:02d} with {df.count()} records")
            except Exception as e:
                print(f"✗ Error loading {year}-{month:02d}: {str(e)}")
        else:
            print(f"✗ File not found: {file_path}")
    
    # Union all monthly DataFrames for this year
    if yearly_dataframes:
        year_df = yearly_dataframes[0]
        for df in yearly_dataframes[1:]:
            year_df = year_df.unionByName(df, allowMissingColumns=True)
        return year_df
    return None
In [53]:
# Process years in chunks to manage memory
print("Loading taxi data for multi-year analysis...")
print(f"Year range: {start_year} to {end_year}")
print("-" * 50)

for year in range(start_year, end_year + 1):
    year_df = load_yearly_data(year, months)
    if year_df is not None:
        all_dataframes.append(year_df)
        print(f"Completed year {year}")
        print("-" * 30)

# Combine all years into one DataFrame
if all_dataframes:
    full_taxi_df = all_dataframes[0]
    for df in all_dataframes[1:]:
        full_taxi_df = full_taxi_df.unionByName(df, allowMissingColumns=True)
    
    print("\nSummary:")
    print(f"Successfully loaded data from {start_year} to {end_year}")
    print(f"Total records: {full_taxi_df.count():,}")
    print(f"Total columns: {len(full_taxi_df.columns)}")
else:
    print("Error: No data could be loaded.")
Loading taxi data for multi-year analysis...
Year range: 2014 to 2024
--------------------------------------------------
✓ Loaded 2014-01 with 13782517 records
✓ Loaded 2014-02 with 13063794 records
✓ Loaded 2014-03 with 15428134 records
✓ Loaded 2014-04 with 14618759 records
✓ Loaded 2014-05 with 14774048 records
✓ Loaded 2014-06 with 13813079 records
✓ Loaded 2014-07 with 13104273 records
✓ Loaded 2014-08 with 12698743 records
✓ Loaded 2014-09 with 13424350 records
✓ Loaded 2014-10 with 14317774 records
✓ Loaded 2014-11 with 13309991 records
✓ Loaded 2014-12 with 13112117 records
Completed year 2014
------------------------------
✓ Loaded 2015-01 with 12741035 records
✓ Loaded 2015-02 with 12442394 records
✓ Loaded 2015-03 with 13342951 records
✓ Loaded 2015-04 with 13063758 records
✓ Loaded 2015-05 with 13157677 records
✓ Loaded 2015-06 with 12324936 records
✓ Loaded 2015-07 with 11559666 records
✓ Loaded 2015-08 with 11123123 records
✓ Loaded 2015-09 with 11218122 records
✓ Loaded 2015-10 with 12307333 records
✓ Loaded 2015-11 with 11305240 records
✓ Loaded 2015-12 with 11452996 records
Completed year 2015
------------------------------
✓ Loaded 2016-01 with 10905067 records
✓ Loaded 2016-02 with 11375412 records
✓ Loaded 2016-03 with 12203824 records
✓ Loaded 2016-04 with 11927996 records
✓ Loaded 2016-05 with 11832049 records
✓ Loaded 2016-06 with 11131645 records
✓ Loaded 2016-07 with 10294080 records
✓ Loaded 2016-08 with 9942263 records
✓ Loaded 2016-09 with 10116018 records
✓ Loaded 2016-10 with 10854626 records
✓ Loaded 2016-11 with 10102128 records
✓ Loaded 2016-12 with 10446697 records
Completed year 2016
------------------------------
✓ Loaded 2017-01 with 9710820 records
✓ Loaded 2017-02 with 9169775 records
✓ Loaded 2017-03 with 10295441 records
✓ Loaded 2017-04 with 10047135 records
✓ Loaded 2017-05 with 10102127 records
✓ Loaded 2017-06 with 9656993 records
✓ Loaded 2017-07 with 8588486 records
✓ Loaded 2017-08 with 8422153 records
✓ Loaded 2017-09 with 8945421 records
✓ Loaded 2017-10 with 9768672 records
✓ Loaded 2017-11 with 9284803 records
✓ Loaded 2017-12 with 9508501 records
Completed year 2017
------------------------------
✓ Loaded 2018-01 with 8760687 records
✓ Loaded 2018-02 with 8492819 records
✓ Loaded 2018-03 with 9431289 records
✓ Loaded 2018-04 with 9306216 records
✓ Loaded 2018-05 with 9224788 records
✓ Loaded 2018-06 with 8714667 records
✓ Loaded 2018-07 with 7851143 records
✓ Loaded 2018-08 with 7855040 records
✓ Loaded 2018-09 with 8049094 records
✓ Loaded 2018-10 with 8834520 records
✓ Loaded 2018-11 with 8155449 records
✓ Loaded 2018-12 with 8195675 records
Completed year 2018
------------------------------
✓ Loaded 2019-01 with 7696617 records
✓ Loaded 2019-02 with 7049370 records
✓ Loaded 2019-03 with 7866620 records
✓ Loaded 2019-04 with 7475949 records
✓ Loaded 2019-05 with 7598445 records
✓ Loaded 2019-06 with 6971560 records
✓ Loaded 2019-07 with 6310419 records
✓ Loaded 2019-08 with 6073357 records
✓ Loaded 2019-09 with 6567788 records
✓ Loaded 2019-10 with 7213891 records
✓ Loaded 2019-11 with 6878111 records
✓ Loaded 2019-12 with 6896317 records
Completed year 2019
------------------------------
✓ Loaded 2020-01 with 6405008 records
✓ Loaded 2020-02 with 6299367 records
✓ Loaded 2020-03 with 3007687 records
✓ Loaded 2020-04 with 238073 records
✓ Loaded 2020-05 with 348415 records
✓ Loaded 2020-06 with 549797 records
✓ Loaded 2020-07 with 800412 records
✓ Loaded 2020-08 with 1007286 records
✓ Loaded 2020-09 with 1341017 records
✓ Loaded 2020-10 with 1681132 records
✓ Loaded 2020-11 with 1509000 records
✓ Loaded 2020-12 with 1461898 records
Completed year 2020
------------------------------
✓ Loaded 2021-01 with 1369769 records
✓ Loaded 2021-02 with 1371709 records
✓ Loaded 2021-03 with 1925152 records
✓ Loaded 2021-04 with 2171187 records
✓ Loaded 2021-05 with 2507109 records
✓ Loaded 2021-06 with 2834264 records
✓ Loaded 2021-07 with 2821746 records
✓ Loaded 2021-08 with 2788757 records
✓ Loaded 2021-09 with 2963793 records
✓ Loaded 2021-10 with 3463504 records
✓ Loaded 2021-11 with 3472949 records
✓ Loaded 2021-12 with 3214369 records
Completed year 2021
------------------------------
✓ Loaded 2022-01 with 2463931 records
✓ Loaded 2022-02 with 2979431 records
✓ Loaded 2022-03 with 3627882 records
✓ Loaded 2022-04 with 3599920 records
✓ Loaded 2022-05 with 3588295 records
✓ Loaded 2022-06 with 3558124 records
✓ Loaded 2022-07 with 3174394 records
✓ Loaded 2022-08 with 3152677 records
✓ Loaded 2022-09 with 3183767 records
✓ Loaded 2022-10 with 3675411 records
✓ Loaded 2022-11 with 3252717 records
✓ Loaded 2022-12 with 3399549 records
Completed year 2022
------------------------------
✓ Loaded 2023-01 with 3066766 records
✓ Loaded 2023-02 with 2913955 records
✓ Loaded 2023-03 with 3403766 records
✓ Loaded 2023-04 with 3288250 records
✓ Loaded 2023-05 with 3513649 records
✓ Loaded 2023-06 with 3307234 records
✓ Loaded 2023-07 with 2907108 records
✓ Loaded 2023-08 with 2824209 records
✓ Loaded 2023-09 with 2846722 records
✓ Loaded 2023-10 with 3522285 records
✓ Loaded 2023-11 with 3339715 records
✓ Loaded 2023-12 with 3376567 records
Completed year 2023
------------------------------
✓ Loaded 2024-01 with 2964624 records
✓ Loaded 2024-02 with 3007526 records
✓ Loaded 2024-03 with 3582628 records
✓ Loaded 2024-04 with 3514289 records
✓ Loaded 2024-05 with 3723833 records
✓ Loaded 2024-06 with 3539193 records
✓ Loaded 2024-07 with 3076903 records
✓ Loaded 2024-08 with 2979183 records
✓ Loaded 2024-09 with 3633030 records
✓ Loaded 2024-10 with 3833771 records
✓ Loaded 2024-11 with 3646369 records
✓ Loaded 2024-12 with 3668371 records
Completed year 2024
------------------------------

Summary:
Successfully loaded data from 2014 to 2024
Total records: 918,278,217
Total columns: 21

Standardize Column Names Across Years¶

In [54]:
# Function to standardize column names across different years
def standardize_columns(df):
    """Standardize column names for consistency across years"""
    
    # Define column mappings
    column_mappings = {
        # Datetime columns
        "tpep_pickup_datetime": "pickup_datetime",
        "tpep_dropoff_datetime": "dropoff_datetime",
        "Trip_Pickup_DateTime": "pickup_datetime",
        "Trip_Dropoff_DateTime": "dropoff_datetime",
        
        # Distance/passenger columns
        "trip_distance": "trip_distance",
        "passenger_count": "passenger_count",
        
        # Financial columns
        "fare_amount": "fare_amount",
        "tip_amount": "tip_amount",
        "total_amount": "total_amount",
        "mta_tax": "mta_tax",
        "extra": "extra",
        "improvement_surcharge": "improvement_surcharge",
        "congestion_surcharge": "congestion_surcharge",
        
        # Location columns
        "PULocationID": "pickup_location_id",
        "DOLocationID": "dropoff_location_id",
        "pickup_latitude": "pickup_lat",
        "pickup_longitude": "pickup_lon",
        "dropoff_latitude": "dropoff_lat",
        "dropoff_longitude": "dropoff_lon",
    }
    
    # Apply column renaming
    for old_col, new_col in column_mappings.items():
        if old_col in df.columns:
            df = df.withColumnRenamed(old_col, new_col)
    
    return df

# Apply standardization to the DataFrame
standardized_df = standardize_columns(full_taxi_df)

# Verify the standardized columns
print("Standardized columns:")
print(standardized_df.columns)
Standardized columns:
['VendorID', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'pickup_location_id', 'dropoff_location_id', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee', 'data_year', 'data_month']

Enhanced Temporal Features for Multi-Year Analysis¶

In [55]:
# Add comprehensive temporal features
def add_multiyear_features(df):
    """Add temporal and derived features for multi-year analysis"""
    
    # Determine pickup/dropoff datetime column names
    if "pickup_datetime" in df.columns:
        pickup_col = "pickup_datetime"
        dropoff_col = "dropoff_datetime"
    else:
        # Try to find the right columns
        pickup_col = next((col for col in df.columns if "pickup" in col.lower() and "datetime" in col.lower()), None)
        dropoff_col = next((col for col in df.columns if "dropoff" in col.lower() and "datetime" in col.lower()), None)
    
    if not pickup_col or not dropoff_col:
        raise ValueError("Could not find pickup/dropoff datetime columns")
    
    # Add comprehensive temporal features
    enhanced_df = df.withColumn("pickup_hour", F.hour(F.col(pickup_col))) \
        .withColumn("pickup_day", F.dayofweek(F.col(pickup_col))) \
        .withColumn("pickup_month", F.month(F.col(pickup_col))) \
        .withColumn("pickup_year", F.year(F.col(pickup_col))) \
        .withColumn("pickup_date", F.to_date(F.col(pickup_col))) \
        .withColumn("pickup_timestamp", F.unix_timestamp(F.col(pickup_col))) \
        .withColumn("is_weekend", F.when(F.dayofweek(F.col(pickup_col)).isin([1, 7]), True).otherwise(False)) \
        .withColumn("is_weekday", F.when(F.dayofweek(F.col(pickup_col)).isin([2,3,4,5,6]), True).otherwise(False)) \
        .withColumn("trip_duration_minutes", 
                    (F.unix_timestamp(F.col(dropoff_col)) - F.unix_timestamp(F.col(pickup_col))) / 60) \
        .withColumn("time_of_day", 
                   F.when((F.hour(F.col(pickup_col)) >= 6) & (F.hour(F.col(pickup_col)) < 12), "Morning")
                   .when((F.hour(F.col(pickup_col)) >= 12) & (F.hour(F.col(pickup_col)) < 18), "Afternoon")
                   .when((F.hour(F.col(pickup_col)) >= 18) & (F.hour(F.col(pickup_col)) < 22), "Evening")
                   .otherwise("Night")) \
        .withColumn("season", 
                   F.when(F.month(F.col(pickup_col)).isin([12, 1, 2]), "Winter")
                   .when(F.month(F.col(pickup_col)).isin([3, 4, 5]), "Spring")
                   .when(F.month(F.col(pickup_col)).isin([6, 7, 8]), "Summer")
                   .otherwise("Fall"))
    
    # Add financial features
    if "fare_amount" in df.columns and "tip_amount" in df.columns:
        enhanced_df = enhanced_df.withColumn("tip_percentage", 
                                            F.when(F.col("fare_amount") > 0, 
                                                   (F.col("tip_amount") / F.col("fare_amount")) * 100)
                                            .otherwise(0))
    
    # Add speed if trip_distance exists
    if "trip_distance" in df.columns:
        enhanced_df = enhanced_df.withColumn("speed_mph", 
                                            F.when(F.col("trip_duration_minutes") > 0, 
                                                   F.col("trip_distance") / (F.col("trip_duration_minutes") / 60))
                                            .otherwise(0))
    
    return enhanced_df

# Apply features to the standardized DataFrame
enhanced_df = add_multiyear_features(standardized_df)

# Register the enhanced DataFrame as a SQL table
enhanced_df.createOrReplaceTempView("nyc_taxi_multiyear")

print("Enhanced DataFrame with multi-year features created successfully!")
Enhanced DataFrame with multi-year features created successfully!

Multi-Year Analysis Queries¶

In [56]:
# Execute multi-year data frame scan
print("Multi-Year Taxi Data Analysis")
print("=" * 50)

# 1. Yearly Trends
print("\n1. Yearly Trip Counts and Averages:")
yearly_stats = spark.sql("""
    SELECT 
        pickup_year,
        COUNT(*) as total_trips,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(tip_amount), 2) as avg_tip,
        ROUND(AVG(trip_distance), 2) as avg_distance,
        ROUND(AVG(trip_duration_minutes), 2) as avg_duration
    FROM nyc_taxi_multiyear
    WHERE pickup_year IS NOT NULL
    GROUP BY pickup_year
    ORDER BY pickup_year
""")
yearly_stats.show()
Multi-Year Taxi Data Analysis
==================================================

1. Yearly Trip Counts and Averages:
+-----------+-----------+--------+-------+------------+------------+
|pickup_year|total_trips|avg_fare|avg_tip|avg_distance|avg_duration|
+-----------+-----------+--------+-------+------------+------------+
|       2001|         27|   19.17|    1.0|        4.55|      304.24|
|       2002|        498|    20.7|   1.44|        4.04|  2881083.73|
|       2003|         50|   21.28|   0.84|        5.86|      348.43|
|       2004|          1|     7.0|    1.5|        1.59|     1421.17|
|       2008|        770|   18.24|   1.86|        4.75|      254.48|
|       2009|       1298|   15.63|   1.18|        3.72|      197.97|
|       2010|          1|    11.5|   3.06|        2.79|     1188.62|
|       2011|          4|    10.0|   0.59|        1.81|      222.78|
|       2012|          1|     0.0|    0.0|         0.0|     1432.22|
|       2014|  165447580|   12.65|   1.51|       13.36|       13.34|
|       2015|  146039232|   12.94|   1.73|       11.85|       19.72|
|       2016|  131131805|   13.13|    1.8|        4.64|        16.3|
|       2017|  113500386|   13.05|   1.84|        2.93|       16.32|
|       2018|  102870524|   13.08|   1.87|        2.94|       16.89|
|       2019|   84597309|   13.41|   2.19|        3.02|       18.06|
|       2020|   24649266|   12.67|   2.08|        3.53|       15.52|
|       2021|   30903983|   13.52|   2.34|        6.92|       16.58|
|       2022|   39655622|   10.36|   7.23|        5.96|       17.32|
|       2023|   38310138|   19.52|   3.52|        4.09|       16.72|
|       2024|   41169670|   19.27|   3.31|        4.98|       17.47|
+-----------+-----------+--------+-------+------------+------------+
only showing top 20 rows

In [57]:
# 2. Seasonal Patterns
print("\n2. Seasonal Patterns:")
seasonal_trends = spark.sql("""
    SELECT 
        pickup_year,
        season,
        COUNT(*) as trip_count,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(tip_percentage), 2) as avg_tip_pct
    FROM nyc_taxi_multiyear
    WHERE pickup_year IS NOT NULL
    GROUP BY pickup_year, season
    ORDER BY pickup_year, 
             CASE 
                 WHEN season = 'Winter' THEN 1
                 WHEN season = 'Spring' THEN 2
                 WHEN season = 'Summer' THEN 3
                 WHEN season = 'Fall' THEN 4
             END
""")
seasonal_trends.show()
2. Seasonal Patterns:
+-----------+------+----------+--------+-----------+
|pickup_year|season|trip_count|avg_fare|avg_tip_pct|
+-----------+------+----------+--------+-----------+
|       2001|Winter|        26|   19.04|       2.27|
|       2001|Summer|         1|    22.5|        0.0|
|       2002|Winter|        59|   19.83|       4.95|
|       2002|  Fall|       439|   20.81|       7.25|
|       2003|Winter|        49|   21.66|       3.37|
|       2003|Spring|         1|     2.5|        0.4|
|       2004|Spring|         1|     7.0|      21.43|
|       2008|Winter|       767|    18.3|       8.25|
|       2008|Summer|         3|    2.33|      10.29|
|       2009|Winter|      1298|   15.63|       6.48|
|       2010|Summer|         1|    11.5|      26.61|
|       2011|Winter|         4|    10.0|       6.94|
|       2012|Winter|         1|     0.0|        0.0|
|       2014|Winter|  39958428|   12.31|       12.7|
|       2014|Spring|  44820941|   12.56|      11.76|
|       2014|Summer|  39616095|   12.84|      12.46|
|       2014|  Fall|  41052116|    12.9|      13.62|
|       2015|Winter|  36636425|   12.48|      15.57|
|       2015|Spring|  39564386|   12.95|       15.2|
|       2015|Summer|  35007725|   13.09|      14.94|
+-----------+------+----------+--------+-----------+
only showing top 20 rows

In [58]:
# 3. Weekday vs Weekend comparison across years
print("\n3. Weekday vs Weekend Trends by Year:")
weekday_weekend_trends = spark.sql("""
    SELECT 
        pickup_year,
        CASE WHEN is_weekend = true THEN 'Weekend' ELSE 'Weekday' END as day_type,
        COUNT(*) as trip_count,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(tip_percentage), 2) as avg_tip_pct
    FROM nyc_taxi_multiyear
    WHERE pickup_year IS NOT NULL
    GROUP BY pickup_year, is_weekend
    ORDER BY pickup_year, day_type
""")
weekday_weekend_trends.show()
3. Weekday vs Weekend Trends by Year:
+-----------+--------+----------+--------+-----------+
|pickup_year|day_type|trip_count|avg_fare|avg_tip_pct|
+-----------+--------+----------+--------+-----------+
|       2001| Weekday|        25|    20.5|       2.36|
|       2001| Weekend|         2|     2.5|        0.0|
|       2002| Weekday|       379|   21.45|       7.02|
|       2002| Weekend|       119|   18.28|       6.83|
|       2003| Weekday|        47|   22.64|       3.53|
|       2003| Weekend|         3|     0.0|        0.0|
|       2004| Weekend|         1|     7.0|      21.43|
|       2008| Weekday|       770|   18.24|       8.26|
|       2009| Weekday|      1298|   15.63|       6.48|
|       2010| Weekend|         1|    11.5|      26.61|
|       2011| Weekday|         4|    10.0|       6.94|
|       2012| Weekday|         1|     0.0|        0.0|
|       2014| Weekday| 117592503|   12.72|       12.9|
|       2014| Weekend|  47855077|   12.48|       11.9|
|       2015| Weekday| 103648767|   13.04|      15.36|
|       2015| Weekend|  42390465|   12.69|      14.91|
|       2016| Weekday|  93815112|   13.24|       17.2|
|       2016| Weekend|  37316693|   12.86|      15.71|
|       2017| Weekday|  81624735|   13.15|       16.9|
|       2017| Weekend|  31875651|   12.79|       15.8|
+-----------+--------+----------+--------+-----------+
only showing top 20 rows

In [59]:
# 4. Monthly patterns across years
print("\n4. Monthly Patterns Across Years:")
monthly_patterns = spark.sql("""
    SELECT 
        pickup_month,
        COUNT(*) as total_trips,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(trip_distance), 2) as avg_distance
    FROM nyc_taxi_multiyear
    WHERE pickup_year IS NOT NULL
    GROUP BY pickup_month
    ORDER BY pickup_month
""")
monthly_patterns.show()
4. Monthly Patterns Across Years:
+------------+-----------+--------+------------+
|pickup_month|total_trips|avg_fare|avg_distance|
+------------+-----------+--------+------------+
|           1|   79868564|   12.73|        4.87|
|           2|   78166054|   12.84|         4.4|
|           3|   84113819|   13.26|         6.1|
|           4|   79251283|    13.5|         8.2|
|           5|   80370596|   13.96|        5.74|
|           6|   76400708|   13.96|        5.21|
|           7|   70488744|   13.77|        4.28|
|           8|   68866041|   13.82|       13.65|
|           9|   72288781|   14.15|       10.13|
|          10|   79473617|   14.02|        5.91|
|          11|   74256588|   13.87|        6.78|
|          12|   74733422|   11.56|        8.51|
+------------+-----------+--------+------------+